/*
 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
 * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 */
package java.util.stream;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Objects;
import java.util.Spliterator;
import java.util.function.IntFunction;


/**
 * Factory methods for transforming streams into sorted streams.
 *
 * @since 1.8
 */
final class SortedOps {

  private SortedOps() {
  }

  /**
   * Appends a "sorted" operation to the provided stream.
   *
   * @param <T> the type of both input and output elements
   * @param upstream a reference stream with element type T
   */
  static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream) {
    return new OfRef<>(upstream);
  }

  /**
   * Appends a "sorted" operation to the provided stream.
   *
   * @param <T> the type of both input and output elements
   * @param upstream a reference stream with element type T
   * @param comparator the comparator to order elements by
   */
  static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
      Comparator<? super T> comparator) {
    return new OfRef<>(upstream, comparator);
  }

  /**
   * Appends a "sorted" operation to the provided stream.
   *
   * @param <T> the type of both input and output elements
   * @param upstream a reference stream with element type T
   */
  static <T> IntStream makeInt(AbstractPipeline<?, Integer, ?> upstream) {
    return new OfInt(upstream);
  }

  /**
   * Appends a "sorted" operation to the provided stream.
   *
   * @param <T> the type of both input and output elements
   * @param upstream a reference stream with element type T
   */
  static <T> LongStream makeLong(AbstractPipeline<?, Long, ?> upstream) {
    return new OfLong(upstream);
  }

  /**
   * Appends a "sorted" operation to the provided stream.
   *
   * @param <T> the type of both input and output elements
   * @param upstream a reference stream with element type T
   */
  static <T> DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream) {
    return new OfDouble(upstream);
  }

  /**
   * Specialized subtype for sorting reference streams
   */
  private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {

    /**
     * Comparator used for sorting
     */
    private final boolean isNaturalSort;
    private final Comparator<? super T> comparator;

    /**
     * Sort using natural order of {@literal <T>} which must be
     * {@code Comparable}.
     */
    OfRef(AbstractPipeline<?, T, ?> upstream) {
      super(upstream, StreamShape.REFERENCE,
          StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
      this.isNaturalSort = true;
      // Will throw CCE when we try to sort if T is not Comparable
      @SuppressWarnings("unchecked")
      Comparator<? super T> comp = (Comparator<? super T>) Comparator.naturalOrder();
      this.comparator = comp;
    }

    /**
     * Sort using the provided comparator.
     *
     * @param comparator The comparator to be used to evaluate ordering.
     */
    OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) {
      super(upstream, StreamShape.REFERENCE,
          StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED);
      this.isNaturalSort = false;
      this.comparator = Objects.requireNonNull(comparator);
    }

    @Override
    public Sink<T> opWrapSink(int flags, Sink<T> sink) {
      Objects.requireNonNull(sink);

      // If the input is already naturally sorted and this operation
      // also naturally sorted then this is a no-op
      if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort) {
        return sink;
      } else if (StreamOpFlag.SIZED.isKnown(flags)) {
        return new SizedRefSortingSink<>(sink, comparator);
      } else {
        return new RefSortingSink<>(sink, comparator);
      }
    }

    @Override
    public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
        Spliterator<P_IN> spliterator,
        IntFunction<T[]> generator) {
      // If the input is already naturally sorted and this operation
      // naturally sorts then collect the output
      if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) {
        return helper.evaluate(spliterator, false, generator);
      } else {
        // @@@ Weak two-pass parallel implementation; parallel collect, parallel sort
        T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator);
        Arrays.parallelSort(flattenedData, comparator);
        return Nodes.node(flattenedData);
      }
    }
  }

  /**
   * Specialized subtype for sorting int streams.
   */
  private static final class OfInt extends IntPipeline.StatefulOp<Integer> {

    OfInt(AbstractPipeline<?, Integer, ?> upstream) {
      super(upstream, StreamShape.INT_VALUE,
          StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
    }

    @Override
    public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
      Objects.requireNonNull(sink);

      if (StreamOpFlag.SORTED.isKnown(flags)) {
        return sink;
      } else if (StreamOpFlag.SIZED.isKnown(flags)) {
        return new SizedIntSortingSink(sink);
      } else {
        return new IntSortingSink(sink);
      }
    }

    @Override
    public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
        Spliterator<P_IN> spliterator,
        IntFunction<Integer[]> generator) {
      if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
        return helper.evaluate(spliterator, false, generator);
      } else {
        Node.OfInt n = (Node.OfInt) helper.evaluate(spliterator, true, generator);

        int[] content = n.asPrimitiveArray();
        Arrays.parallelSort(content);

        return Nodes.node(content);
      }
    }
  }

  /**
   * Specialized subtype for sorting long streams.
   */
  private static final class OfLong extends LongPipeline.StatefulOp<Long> {

    OfLong(AbstractPipeline<?, Long, ?> upstream) {
      super(upstream, StreamShape.LONG_VALUE,
          StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
    }

    @Override
    public Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
      Objects.requireNonNull(sink);

      if (StreamOpFlag.SORTED.isKnown(flags)) {
        return sink;
      } else if (StreamOpFlag.SIZED.isKnown(flags)) {
        return new SizedLongSortingSink(sink);
      } else {
        return new LongSortingSink(sink);
      }
    }

    @Override
    public <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
        Spliterator<P_IN> spliterator,
        IntFunction<Long[]> generator) {
      if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
        return helper.evaluate(spliterator, false, generator);
      } else {
        Node.OfLong n = (Node.OfLong) helper.evaluate(spliterator, true, generator);

        long[] content = n.asPrimitiveArray();
        Arrays.parallelSort(content);

        return Nodes.node(content);
      }
    }
  }

  /**
   * Specialized subtype for sorting double streams.
   */
  private static final class OfDouble extends DoublePipeline.StatefulOp<Double> {

    OfDouble(AbstractPipeline<?, Double, ?> upstream) {
      super(upstream, StreamShape.DOUBLE_VALUE,
          StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
    }

    @Override
    public Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
      Objects.requireNonNull(sink);

      if (StreamOpFlag.SORTED.isKnown(flags)) {
        return sink;
      } else if (StreamOpFlag.SIZED.isKnown(flags)) {
        return new SizedDoubleSortingSink(sink);
      } else {
        return new DoubleSortingSink(sink);
      }
    }

    @Override
    public <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
        Spliterator<P_IN> spliterator,
        IntFunction<Double[]> generator) {
      if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
        return helper.evaluate(spliterator, false, generator);
      } else {
        Node.OfDouble n = (Node.OfDouble) helper.evaluate(spliterator, true, generator);

        double[] content = n.asPrimitiveArray();
        Arrays.parallelSort(content);

        return Nodes.node(content);
      }
    }
  }

  /**
   * Abstract {@link Sink} for implementing sort on reference streams.
   *
   * <p>
   * Note: documentation below applies to reference and all primitive sinks.
   * <p>
   * Sorting sinks first accept all elements, buffering then into an array
   * or a re-sizable data structure, if the size of the pipeline is known or
   * unknown respectively.  At the end of the sink protocol those elements are
   * sorted and then pushed downstream.
   * This class records if {@link #cancellationRequested} is called.  If so it
   * can be inferred that the source pushing source elements into the pipeline
   * knows that the pipeline is short-circuiting.  In such cases sub-classes
   * pushing elements downstream will preserve the short-circuiting protocol
   * by calling {@code downstream.cancellationRequested()} and checking the
   * result is {@code false} before an element is pushed.
   * <p>
   * Note that the above behaviour is an optimization for sorting with
   * sequential streams.  It is not an error that more elements, than strictly
   * required to produce a result, may flow through the pipeline.  This can
   * occur, in general (not restricted to just sorting), for short-circuiting
   * parallel pipelines.
   */
  private static abstract class AbstractRefSortingSink<T> extends Sink.ChainedReference<T, T> {

    protected final Comparator<? super T> comparator;
    // @@@ could be a lazy final value, if/when support is added
    protected boolean cancellationWasRequested;

    AbstractRefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) {
      super(downstream);
      this.comparator = comparator;
    }

    /**
     * Records is cancellation is requested so short-circuiting behaviour
     * can be preserved when the sorted elements are pushed downstream.
     *
     * @return false, as this sink never short-circuits.
     */
    @Override
    public final boolean cancellationRequested() {
      cancellationWasRequested = true;
      return false;
    }
  }

  /**
   * {@link Sink} for implementing sort on SIZED reference streams.
   */
  private static final class SizedRefSortingSink<T> extends AbstractRefSortingSink<T> {

    private T[] array;
    private int offset;

    SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
      super(sink, comparator);
    }

    @Override
    @SuppressWarnings("unchecked")
    public void begin(long size) {
      if (size >= Nodes.MAX_ARRAY_SIZE) {
        throw new IllegalArgumentException(Nodes.BAD_SIZE);
      }
      array = (T[]) new Object[(int) size];
    }

    @Override
    public void end() {
      Arrays.sort(array, 0, offset, comparator);
      downstream.begin(offset);
      if (!cancellationWasRequested) {
        for (int i = 0; i < offset; i++) {
          downstream.accept(array[i]);
        }
      } else {
        for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) {
          downstream.accept(array[i]);
        }
      }
      downstream.end();
      array = null;
    }

    @Override
    public void accept(T t) {
      array[offset++] = t;
    }
  }

  /**
   * {@link Sink} for implementing sort on reference streams.
   */
  private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> {

    private ArrayList<T> list;

    RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
      super(sink, comparator);
    }

    @Override
    public void begin(long size) {
      if (size >= Nodes.MAX_ARRAY_SIZE) {
        throw new IllegalArgumentException(Nodes.BAD_SIZE);
      }
      list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
    }

    @Override
    public void end() {
      list.sort(comparator);
      downstream.begin(list.size());
      if (!cancellationWasRequested) {
        list.forEach(downstream::accept);
      } else {
        for (T t : list) {
          if (downstream.cancellationRequested()) {
            break;
          }
          downstream.accept(t);
        }
      }
      downstream.end();
      list = null;
    }

    @Override
    public void accept(T t) {
      list.add(t);
    }
  }

  /**
   * Abstract {@link Sink} for implementing sort on int streams.
   */
  private static abstract class AbstractIntSortingSink extends Sink.ChainedInt<Integer> {

    protected boolean cancellationWasRequested;

    AbstractIntSortingSink(Sink<? super Integer> downstream) {
      super(downstream);
    }

    @Override
    public final boolean cancellationRequested() {
      cancellationWasRequested = true;
      return false;
    }
  }

  /**
   * {@link Sink} for implementing sort on SIZED int streams.
   */
  private static final class SizedIntSortingSink extends AbstractIntSortingSink {

    private int[] array;
    private int offset;

    SizedIntSortingSink(Sink<? super Integer> downstream) {
      super(downstream);
    }

    @Override
    public void begin(long size) {
      if (size >= Nodes.MAX_ARRAY_SIZE) {
        throw new IllegalArgumentException(Nodes.BAD_SIZE);
      }
      array = new int[(int) size];
    }

    @Override
    public void end() {
      Arrays.sort(array, 0, offset);
      downstream.begin(offset);
      if (!cancellationWasRequested) {
        for (int i = 0; i < offset; i++) {
          downstream.accept(array[i]);
        }
      } else {
        for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) {
          downstream.accept(array[i]);
        }
      }
      downstream.end();
      array = null;
    }

    @Override
    public void accept(int t) {
      array[offset++] = t;
    }
  }

  /**
   * {@link Sink} for implementing sort on int streams.
   */
  private static final class IntSortingSink extends AbstractIntSortingSink {

    private SpinedBuffer.OfInt b;

    IntSortingSink(Sink<? super Integer> sink) {
      super(sink);
    }

    @Override
    public void begin(long size) {
      if (size >= Nodes.MAX_ARRAY_SIZE) {
        throw new IllegalArgumentException(Nodes.BAD_SIZE);
      }
      b = (size > 0) ? new SpinedBuffer.OfInt((int) size) : new SpinedBuffer.OfInt();
    }

    @Override
    public void end() {
      int[] ints = b.asPrimitiveArray();
      Arrays.sort(ints);
      downstream.begin(ints.length);
      if (!cancellationWasRequested) {
        for (int anInt : ints) {
          downstream.accept(anInt);
        }
      } else {
        for (int anInt : ints) {
          if (downstream.cancellationRequested()) {
            break;
          }
          downstream.accept(anInt);
        }
      }
      downstream.end();
    }

    @Override
    public void accept(int t) {
      b.accept(t);
    }
  }

  /**
   * Abstract {@link Sink} for implementing sort on long streams.
   */
  private static abstract class AbstractLongSortingSink extends Sink.ChainedLong<Long> {

    protected boolean cancellationWasRequested;

    AbstractLongSortingSink(Sink<? super Long> downstream) {
      super(downstream);
    }

    @Override
    public final boolean cancellationRequested() {
      cancellationWasRequested = true;
      return false;
    }
  }

  /**
   * {@link Sink} for implementing sort on SIZED long streams.
   */
  private static final class SizedLongSortingSink extends AbstractLongSortingSink {

    private long[] array;
    private int offset;

    SizedLongSortingSink(Sink<? super Long> downstream) {
      super(downstream);
    }

    @Override
    public void begin(long size) {
      if (size >= Nodes.MAX_ARRAY_SIZE) {
        throw new IllegalArgumentException(Nodes.BAD_SIZE);
      }
      array = new long[(int) size];
    }

    @Override
    public void end() {
      Arrays.sort(array, 0, offset);
      downstream.begin(offset);
      if (!cancellationWasRequested) {
        for (int i = 0; i < offset; i++) {
          downstream.accept(array[i]);
        }
      } else {
        for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) {
          downstream.accept(array[i]);
        }
      }
      downstream.end();
      array = null;
    }

    @Override
    public void accept(long t) {
      array[offset++] = t;
    }
  }

  /**
   * {@link Sink} for implementing sort on long streams.
   */
  private static final class LongSortingSink extends AbstractLongSortingSink {

    private SpinedBuffer.OfLong b;

    LongSortingSink(Sink<? super Long> sink) {
      super(sink);
    }

    @Override
    public void begin(long size) {
      if (size >= Nodes.MAX_ARRAY_SIZE) {
        throw new IllegalArgumentException(Nodes.BAD_SIZE);
      }
      b = (size > 0) ? new SpinedBuffer.OfLong((int) size) : new SpinedBuffer.OfLong();
    }

    @Override
    public void end() {
      long[] longs = b.asPrimitiveArray();
      Arrays.sort(longs);
      downstream.begin(longs.length);
      if (!cancellationWasRequested) {
        for (long aLong : longs) {
          downstream.accept(aLong);
        }
      } else {
        for (long aLong : longs) {
          if (downstream.cancellationRequested()) {
            break;
          }
          downstream.accept(aLong);
        }
      }
      downstream.end();
    }

    @Override
    public void accept(long t) {
      b.accept(t);
    }
  }

  /**
   * Abstract {@link Sink} for implementing sort on long streams.
   */
  private static abstract class AbstractDoubleSortingSink extends Sink.ChainedDouble<Double> {

    protected boolean cancellationWasRequested;

    AbstractDoubleSortingSink(Sink<? super Double> downstream) {
      super(downstream);
    }

    @Override
    public final boolean cancellationRequested() {
      cancellationWasRequested = true;
      return false;
    }
  }

  /**
   * {@link Sink} for implementing sort on SIZED double streams.
   */
  private static final class SizedDoubleSortingSink extends AbstractDoubleSortingSink {

    private double[] array;
    private int offset;

    SizedDoubleSortingSink(Sink<? super Double> downstream) {
      super(downstream);
    }

    @Override
    public void begin(long size) {
      if (size >= Nodes.MAX_ARRAY_SIZE) {
        throw new IllegalArgumentException(Nodes.BAD_SIZE);
      }
      array = new double[(int) size];
    }

    @Override
    public void end() {
      Arrays.sort(array, 0, offset);
      downstream.begin(offset);
      if (!cancellationWasRequested) {
        for (int i = 0; i < offset; i++) {
          downstream.accept(array[i]);
        }
      } else {
        for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) {
          downstream.accept(array[i]);
        }
      }
      downstream.end();
      array = null;
    }

    @Override
    public void accept(double t) {
      array[offset++] = t;
    }
  }

  /**
   * {@link Sink} for implementing sort on double streams.
   */
  private static final class DoubleSortingSink extends AbstractDoubleSortingSink {

    private SpinedBuffer.OfDouble b;

    DoubleSortingSink(Sink<? super Double> sink) {
      super(sink);
    }

    @Override
    public void begin(long size) {
      if (size >= Nodes.MAX_ARRAY_SIZE) {
        throw new IllegalArgumentException(Nodes.BAD_SIZE);
      }
      b = (size > 0) ? new SpinedBuffer.OfDouble((int) size) : new SpinedBuffer.OfDouble();
    }

    @Override
    public void end() {
      double[] doubles = b.asPrimitiveArray();
      Arrays.sort(doubles);
      downstream.begin(doubles.length);
      if (!cancellationWasRequested) {
        for (double aDouble : doubles) {
          downstream.accept(aDouble);
        }
      } else {
        for (double aDouble : doubles) {
          if (downstream.cancellationRequested()) {
            break;
          }
          downstream.accept(aDouble);
        }
      }
      downstream.end();
    }

    @Override
    public void accept(double t) {
      b.accept(t);
    }
  }
}
